Project

General

Profile

Statistics
| Branch: | Revision:

root / env / lib / python2.7 / site-packages / distribute-0.6.19-py2.7.egg / setuptools / archive_util.py @ 1a305335

History | View | Annotate | Download (5.79 KB)

1 1a305335 officers
"""Utilities for extracting common archive formats"""
2
3
4
__all__ = [
5
    "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
6
    "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
7
]
8
9
import zipfile, tarfile, os, shutil
10
from pkg_resources import ensure_directory
11
from distutils.errors import DistutilsError
12
13
class UnrecognizedFormat(DistutilsError):
14
    """Couldn't recognize the archive type"""
15
16
def default_filter(src,dst):
17
    """The default progress/filter callback; returns True for all files"""   
18
    return dst
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def unpack_archive(filename, extract_dir, progress_filter=default_filter,
43
    drivers=None
44
):
45
    """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
46

47
    `progress_filter` is a function taking two arguments: a source path
48
    internal to the archive ('/'-separated), and a filesystem path where it
49
    will be extracted.  The callback must return the desired extract path
50
    (which may be the same as the one passed in), or else ``None`` to skip
51
    that file or directory.  The callback can thus be used to report on the
52
    progress of the extraction, as well as to filter the items extracted or
53
    alter their extraction paths.
54

55
    `drivers`, if supplied, must be a non-empty sequence of functions with the
56
    same signature as this function (minus the `drivers` argument), that raise
57
    ``UnrecognizedFormat`` if they do not support extracting the designated
58
    archive type.  The `drivers` are tried in sequence until one is found that
59
    does not raise an error, or until all are exhausted (in which case
60
    ``UnrecognizedFormat`` is raised).  If you do not supply a sequence of
61
    drivers, the module's ``extraction_drivers`` constant will be used, which
62
    means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
63
    order.
64
    """
65
    for driver in drivers or extraction_drivers:
66
        try:
67
            driver(filename, extract_dir, progress_filter)
68
        except UnrecognizedFormat:
69
            continue
70
        else:
71
            return
72
    else:
73
        raise UnrecognizedFormat(
74
            "Not a recognized archive type: %s" % filename
75
        )
76
77
78
79
80
81
82
83
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
84
    """"Unpack" a directory, using the same interface as for archives
85

86
    Raises ``UnrecognizedFormat`` if `filename` is not a directory
87
    """
88
    if not os.path.isdir(filename):
89
        raise UnrecognizedFormat("%s is not a directory" % (filename,))
90
91
    paths = {filename:('',extract_dir)}
92
    for base, dirs, files in os.walk(filename):
93
        src,dst = paths[base]
94
        for d in dirs:
95
            paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
96
        for f in files:
97
            name = src+f
98
            target = os.path.join(dst,f)
99
            target = progress_filter(src+f, target)
100
            if not target:
101
                continue    # skip non-files
102
            ensure_directory(target)
103
            f = os.path.join(base,f)
104
            shutil.copyfile(f, target)
105
            shutil.copystat(f, target)
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
125
    """Unpack zip `filename` to `extract_dir`
126

127
    Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
128
    by ``zipfile.is_zipfile()``).  See ``unpack_archive()`` for an explanation
129
    of the `progress_filter` argument.
130
    """
131
132
    if not zipfile.is_zipfile(filename):
133
        raise UnrecognizedFormat("%s is not a zip file" % (filename,))
134
135
    z = zipfile.ZipFile(filename)
136
    try:
137
        for info in z.infolist():
138
            name = info.filename
139
140
            # don't extract absolute paths or ones with .. in them
141
            if name.startswith('/') or '..' in name:
142
                continue
143
144
            target = os.path.join(extract_dir, *name.split('/'))
145
            target = progress_filter(name, target)
146
            if not target:
147
                continue
148
            if name.endswith('/'):
149
                # directory
150
                ensure_directory(target)
151
            else:
152
                # file
153
                ensure_directory(target)
154
                data = z.read(info.filename)
155
                f = open(target,'wb')
156
                try:
157
                    f.write(data)
158
                finally:
159
                    f.close()
160
                    del data
161
    finally:
162
        z.close()
163
164
165
def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
166
    """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
167

168
    Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
169
    by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
170
    of the `progress_filter` argument.
171
    """
172
173
    try:
174
        tarobj = tarfile.open(filename)
175
    except tarfile.TarError:
176
        raise UnrecognizedFormat(
177
            "%s is not a compressed or uncompressed tar file" % (filename,)
178
        )
179
180
    try:
181
        tarobj.chown = lambda *args: None   # don't do any chowning!
182
        for member in tarobj:
183
            if member.isfile() or member.isdir():
184
                name = member.name
185
                # don't extract absolute paths or ones with .. in them
186
                if not name.startswith('/') and '..' not in name:
187
                    dst = os.path.join(extract_dir, *name.split('/'))                
188
                    dst = progress_filter(name, dst)
189
                    if dst:
190
                        if dst.endswith(os.sep):
191
                            dst = dst[:-1]
192
                        try:
193
                            tarobj._extract_member(member,dst)  # XXX Ugh
194
                        except tarfile.ExtractError:
195
                            pass    # chown/chmod/mkfifo/mknode/makedev failed
196
        return True
197
    finally:
198
        tarobj.close()
199
200
201
202
203
extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
204
205
206
207