Spaces:
Paused
Paused
File size: 7,077 Bytes
d49f7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
"""Utilities for extracting common archive formats"""
import zipfile
import tarfile
import os
import shutil
import posixpath
import contextlib
from distutils.errors import DistutilsError
from pkg_resources import ensure_directory
__all__ = [
"unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
"UnrecognizedFormat", "extraction_drivers", "unpack_directory",
]
class UnrecognizedFormat(DistutilsError):
"""Couldn't recognize the archive type"""
def default_filter(src, dst):
"""The default progress/filter callback; returns True for all files"""
return dst
def unpack_archive(
filename, extract_dir, progress_filter=default_filter,
drivers=None):
"""Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
`progress_filter` is a function taking two arguments: a source path
internal to the archive ('/'-separated), and a filesystem path where it
will be extracted. The callback must return the desired extract path
(which may be the same as the one passed in), or else ``None`` to skip
that file or directory. The callback can thus be used to report on the
progress of the extraction, as well as to filter the items extracted or
alter their extraction paths.
`drivers`, if supplied, must be a non-empty sequence of functions with the
same signature as this function (minus the `drivers` argument), that raise
``UnrecognizedFormat`` if they do not support extracting the designated
archive type. The `drivers` are tried in sequence until one is found that
does not raise an error, or until all are exhausted (in which case
``UnrecognizedFormat`` is raised). If you do not supply a sequence of
drivers, the module's ``extraction_drivers`` constant will be used, which
means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
order.
"""
for driver in drivers or extraction_drivers:
try:
driver(filename, extract_dir, progress_filter)
except UnrecognizedFormat:
continue
else:
return
else:
raise UnrecognizedFormat(
"Not a recognized archive type: %s" % filename
)
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory, using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, dirs, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
"""Unpack zip `filename` to `extract_dir`
Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
of the `progress_filter` argument.
"""
if not zipfile.is_zipfile(filename):
raise UnrecognizedFormat("%s is not a zip file" % (filename,))
with zipfile.ZipFile(filename) as z:
for info in z.infolist():
name = info.filename
# don't extract absolute paths or ones with .. in them
if name.startswith('/') or '..' in name.split('/'):
continue
target = os.path.join(extract_dir, *name.split('/'))
target = progress_filter(name, target)
if not target:
continue
if name.endswith('/'):
# directory
ensure_directory(target)
else:
# file
ensure_directory(target)
data = z.read(info.filename)
with open(target, 'wb') as f:
f.write(data)
unix_attributes = info.external_attr >> 16
if unix_attributes:
os.chmod(target, unix_attributes)
def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
"""Resolve any links and extract link targets as normal files."""
while tar_member_obj is not None and (
tar_member_obj.islnk() or tar_member_obj.issym()):
linkpath = tar_member_obj.linkname
if tar_member_obj.issym():
base = posixpath.dirname(tar_member_obj.name)
linkpath = posixpath.join(base, linkpath)
linkpath = posixpath.normpath(linkpath)
tar_member_obj = tar_obj._getmember(linkpath)
is_file_or_dir = (
tar_member_obj is not None and
(tar_member_obj.isfile() or tar_member_obj.isdir())
)
if is_file_or_dir:
return tar_member_obj
raise LookupError('Got unknown file type')
def _iter_open_tar(tar_obj, extract_dir, progress_filter):
"""Emit member-destination pairs from a tar archive."""
# don't do any chowning!
tar_obj.chown = lambda *args: None
with contextlib.closing(tar_obj):
for member in tar_obj:
name = member.name
# don't extract absolute paths or ones with .. in them
if name.startswith('/') or '..' in name.split('/'):
continue
prelim_dst = os.path.join(extract_dir, *name.split('/'))
try:
member = _resolve_tar_file_or_dir(tar_obj, member)
except LookupError:
continue
final_dst = progress_filter(name, prelim_dst)
if not final_dst:
continue
if final_dst.endswith(os.sep):
final_dst = final_dst[:-1]
yield member, final_dst
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
"""Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
of the `progress_filter` argument.
"""
try:
tarobj = tarfile.open(filename)
except tarfile.TarError as e:
raise UnrecognizedFormat(
"%s is not a compressed or uncompressed tar file" % (filename,)
) from e
for member, final_dst in _iter_open_tar(
tarobj, extract_dir, progress_filter,
):
try:
# XXX Ugh
tarobj._extract_member(member, final_dst)
except tarfile.ExtractError:
# chown/chmod/mkfifo/mknode/makedev failed
pass
return True
extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
|