1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
import os
import pkg_resources
from zope.interface import implements
from repoze.bfg.interfaces import IPackageOverrides
from repoze.bfg.threadlocal import get_current_registry
class OverrideProvider(pkg_resources.DefaultProvider):
def __init__(self, module):
pkg_resources.DefaultProvider.__init__(self, module)
self.module_name = module.__name__
def _get_overrides(self):
reg = get_current_registry()
overrides = reg.queryUtility(IPackageOverrides, self.module_name)
return overrides
def get_resource_filename(self, manager, resource_name):
""" Return a true filesystem path for resource_name,
co-ordinating the extraction with manager, if the resource
must be unpacked to the filesystem.
"""
overrides = self._get_overrides()
if overrides is not None:
filename = overrides.get_filename(resource_name)
if filename is not None:
return filename
return pkg_resources.DefaultProvider.get_resource_filename(
self, manager, resource_name)
def get_resource_stream(self, manager, resource_name):
""" Return a readable file-like object for resource_name."""
overrides = self._get_overrides()
if overrides is not None:
stream = overrides.get_stream(resource_name)
if stream is not None:
return stream
return pkg_resources.DefaultProvider.get_resource_stream(
self, manager, resource_name)
def get_resource_string(self, manager, resource_name):
""" Return a string containing the contents of resource_name."""
overrides = self._get_overrides()
if overrides is not None:
string = overrides.get_string(resource_name)
if string is not None:
return string
return pkg_resources.DefaultProvider.get_resource_string(
self, manager, resource_name)
def has_resource(self, resource_name):
overrides = self._get_overrides()
if overrides is not None:
result = overrides.has_resource(resource_name)
if result is not None:
return result
return pkg_resources.DefaultProvider.has_resource(
self, resource_name)
def resource_isdir(self, resource_name):
overrides = self._get_overrides()
if overrides is not None:
result = overrides.isdir(resource_name)
if result is not None:
return result
return pkg_resources.DefaultProvider.resource_isdir(
self, resource_name)
def resource_listdir(self, resource_name):
overrides = self._get_overrides()
if overrides is not None:
result = overrides.listdir(resource_name)
if result is not None:
return result
return pkg_resources.DefaultProvider.resource_listdir(
self, resource_name)
class PackageOverrides:
implements(IPackageOverrides)
# pkg_resources arg in kw args below for testing
def __init__(self, package, pkg_resources=pkg_resources):
if hasattr(package, '__loader__') and not isinstance(package.__loader__,
self.__class__):
raise TypeError('Package %s already has a non-%s __loader__ '
'(probably a module in a zipped egg)' %
(package, self.__class__))
# We register ourselves as a __loader__ *only* to support the
# setuptools _find_adapter adapter lookup; this class doesn't
# actually support the PEP 302 loader "API". This is
# excusable due to the following statement in the spec:
# ... Loader objects are not
# required to offer any useful functionality (any such functionality,
# such as the zipimport get_data() method mentioned above, is
# optional)...
# A __loader__ attribute is basically metadata, and setuptools
# uses it as such.
package.__loader__ = self
# we call register_loader_type for every instantiation of this
# class; that's OK, it's idempotent to do it more than once.
pkg_resources.register_loader_type(self.__class__, OverrideProvider)
self.overrides = []
self.overridden_package_name = package.__name__
def insert(self, path, package, prefix):
if not path or path.endswith('/'):
override = DirectoryOverride(path, package, prefix)
else:
override = FileOverride(path, package, prefix)
self.overrides.insert(0, override)
return override
def search_path(self, resource_name):
for override in self.overrides:
o = override(resource_name)
if o is not None:
package, name = o
yield package, name
def get_filename(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return pkg_resources.resource_filename(package, rname)
def get_stream(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return pkg_resources.resource_stream(package, rname)
def get_string(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return pkg_resources.resource_string(package, rname)
def has_resource(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return True
def isdir(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return pkg_resources.resource_isdir(package, rname)
def listdir(self, resource_name):
for package, rname in self.search_path(resource_name):
if pkg_resources.resource_exists(package, rname):
return pkg_resources.resource_listdir(package, rname)
class DirectoryOverride:
def __init__(self, path, package, prefix):
self.path = path
self.package = package
self.prefix = prefix
self.pathlen = len(self.path)
def __call__(self, resource_name):
if resource_name.startswith(self.path):
name = '%s%s' % (self.prefix, resource_name[self.pathlen:])
return self.package, name
class FileOverride:
def __init__(self, path, package, prefix):
self.path = path
self.package = package
self.prefix = prefix
def __call__(self, resource_name):
if resource_name == self.path:
return self.package, self.prefix
def resolve_resource_spec(spec, package_name='__main__'):
if os.path.isabs(spec):
return None, spec
filename = spec
if ':' in spec:
package_name, filename = spec.split(':', 1)
elif package_name is None:
package_name, filename = None, spec
return package_name, filename
|